Pulsar Sink 入门指南
阅读本文需要约 20 分钟。
Apache Pulsar 是一个分布式的发布-订阅消息系统,sink 是 Pulsar 的一个组件,用于将数据导入至其他系统。本文介绍了 sink 的功能,并演示了如何创建与使用 JDBC sink 与 MySQL 进行连接。
Sink 命令
$ bin/pulsar-admin sink create <options>
常用参数
-a,--archive : 指定 sink 的 NAR 包
--classname : 指定 sink 的类名称
-i,--inputs : 指定 sink 的 topic,多个 topic 用逗号隔开
--name : 指定 sink 的名称
--namespace : 指定 sink 的命名空间
--parallelism : 指定 sink 的并发数
--sink-config-file : 指定 sink 的 yaml 配置文件
--tenant : 指定 sink 的租户
$ bin/pulsar-admin sink update <options>
常用参数
-a,--archive : 指定 sink 的 NAR 包
--classname : 指定 sink 的类名称
-i,--inputs : 指定 sink 的 topic,多个 topic 用逗号隔开
--name : 指定 sink 的名称
--namespace : 指定 sink 的命名空间
--parallelism : 指定 sink 的并发数
--sink-config-file : 指定 sink 的 yaml 配置文件
--tenant : 指定 sink 的租户
$ bin/pulsar-admin sink delete <options>
常用参数
--name : 指定 sink 的名称
--namespace : 指定 sink 的命名空间
--tenant : 指定 sink 的租户
$ bin/pulsar-admin sink list <options>
常用参数
--namespace : 指定 sink 的命名空间
--tenant : 指定 sink 的租户
$ bin/pulsar-admin sink get <options>
常用参数
--name : 指定 sink 的名称
--namespace : 指定 sink 的命名空间
--tenant : 指定 sink 的租户
$ bin/pulsar-admin sink status <options>
常用参数
--instance-id : 指定 sink 的实例 ID
如果未指定,则获取所有实例的状态
--name : 指定 sink 的名称
--namespace : 指定 sink 的命名空间
--tenant : 指定 sink 的租户
$ bin/pulsar-admin sink stop <options>
常用参数
--instance-id : 指定 sink 的实例 ID
如果未指定,则停止所有实例的状态
--name : 指定 sink 的名称
--namespace : 指定 sink 的命名空间
--tenant : 指定 sink 的租户
$ bin/pulsar-admin sink start <options>
常用参数
--instance-id : 指定 sink 的实例 ID
如果未指定,则启动所有实例
--name : 指定 sink 的名称
--namespace : 指定 sink 的命名空间
--tenant : 指定 sink 的租户
$ bin/pulsar-admin sink restart <options>
常用参数
--instance-id : 指定 sink 的实例 ID
如果未指定,则获取所有实例的状态
--name : 指定 sink 的名称
--namespace : 指定 sink 的命名空间
--tenant : 指定 sink 的租户
Localrun
本地运行
在本地运行一个 Pulsar IO sink connector,方便调试。
$ bin/pulsar-admin sink localrun <options>
常用参数
-a,--archive : 指定 source 的 NAR 包
--classname : 指定 sink 的类名称
-i,--inputs : 指定 sink 的 topic,多个 topic 用逗号隔开
--name : 指定 sink 的名称
--namespace : 指定 sink 的命名空间
--parallelism : 指定 sink 的并发数
--sink-config-file : 指定 sink 的 yaml 配置文件
--tenant : 指定 sink 的租户
环境搭建
本示例创建JDBC sink,并使用 JDBC sink 与 MySQL 进行连接。
准备工作
本示例在 Mac 系统上进行。在开始之前,需要安装以下依赖:
Docker (docker.com)
Java (https://www.oracle.com/technetwork/java/javase/downloads/index.html)
Maven (https://archive.apache.org/dist/maven/maven-3/)
Git (https://www.linode.com/docs/development/version-control/how-to-install-git-on-linux-mac-and-windows/)
开始搭建
搭建步骤总计 6 步。
1. 安装与启动 MySQL
(1)拉取 MySQL 镜像。
$ docker pull mysql:5.7
(2)启动 MySQL。
$ docker run -d -it --rm \
--name pulsar-mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=jdbc \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw \
mysql:5.7
提示:
-d : 以后台模式运行。
-it : 以交互模式运行,并为 docker 分配一个伪输入终端。
--rm : docker 停止后,自动删除 docker
--name : 指定 docker 名称。
本示例指定 docker 名称为 pulsar-mysql。
-p : 指定端口。
本示例指定对外暴露 3306 端口。
-e : 指定环境变量。
本示例为 MySQL 指定以下信息:
root 用户的密码为 jdbc
普通用户的名称为 mysqluser
普通用户的密码为mysqlpw
(3)验证是否成功启动。
$ docker logs -f pulsar-mysql
如果出现以下信息,则说明成功启动。
2019-07-29T01:50:05.116660Z 0 [Note] InnoDB: Waiting for purge to start
2019-07-29T01:50:05.168247Z 0 [Note] InnoDB: 5.7.26 started; log sequence number 12363846
2019-07-29T01:50:05.168596Z 0 [Note] InnoDB: Loading buffer pool(s) from /var/lib/mysql/ib_buffer_pool
2019-07-29T01:50:05.168855Z 0 [Note] Plugin 'FEDERATED' is disabled.
2019-07-29T01:50:05.173901Z 0 [Note] InnoDB: Buffer pool(s) load completed at 190729 1:50:05
2019-07-29T01:50:05.174778Z 0 [Note] Found ca.pem, server-cert.pem and server-key.pem in data directory. Trying to enable SSL support using them.
2019-07-29T01:50:05.175045Z 0 [Warning] CA certificate ca.pem is self signed.
2019-07-29T01:50:05.176942Z 0 [Note] Server hostname (bind-address): '*'; port: 3306
2019-07-29T01:50:05.177017Z 0 [Note] IPv6 is available.
2019-07-29T01:50:05.178937Z 0 [Note] - '::' resolves to '::';
2019-07-29T01:50:05.178998Z 0 [Note] Server socket created on IP: '::'.
2019-07-29T01:50:05.181545Z 0 [Warning] Insecure configuration for --pid-file: Location '/var/run/mysqld' in the path is accessible to all OS users. Consider choosing a different directory.
2019-07-29T01:50:05.192955Z 0 [Note] Event Scheduler: Loaded 0 events
2019-07-29T01:50:05.193401Z 0 [Note] mysqld: ready for connections.
Version: '5.7.26' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server (GPL)
2 创建 MySQL 表
为了简化操作,本示例使用 root 用户和密码进入 docker,再创建数据库和表,方便数据写入。
(1)进入 MySQL。
$ docker exec -it pulsar-mysql /bin/bash
mysql -h localhost -uroot -pjdbc
(2)创建数据库和表。
$ create database test_jdbc;
$ use test_jdbc;
$ create table if not exists test_jdbc
(
id INT AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
primary key (id)
)
engine=innodb;
3. 安装与启动 Pulsar
(1)下载并安装 Pulsar。
$ wget https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.4.0/apache-pulsar-2.4.0-bin.tar.gz
$ tar -zxvf apache-pulsar-2.4.0-bin.tar.gz
$ cd apache-pulsar-2.4.0
(2)启动 Pulsar。
$ bin/pulsar standalone -nss
(3)验证是否成功启动。
如果出现以下信息,则说明启动成功。
09:56:22.753 [pulsar-web-44-8] INFO org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
09:56:22.761 [pulsar-web-44-8] INFO org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Created namespace public/default
09:56:22.763 [pulsar-web-44-8] INFO org.eclipse.jetty.server.RequestLog - 192.168.50.140 - - [29/七月/2019:09:56:22 +0800] "PUT /admin/v2/namespaces/public/default HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.4.0" 12
09:56:22.771 [pulsar-web-44-11] INFO org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
09:56:22.777 [pulsar-ordered-OrderedExecutor-1-0-EventThread] INFO org.apache.pulsar.zookeeper.ZooKeeperDataCache - [State:CONNECTED Timeout:30000 sessionid:0x1003d74007c0003 local:/127.0.0.1:61606 remoteserver:localhost/127.0.0.1:2181 lastZxid:167 xid:42 sent:42 recv:44 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/admin/policies/public/default
09:56:22.778 [pulsar-web-44-11] INFO org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Successfully updated the replication clusters on namespace public/default
09:56:22.779 [pulsar-web-44-11] INFO org.eclipse.jetty.server.RequestLog - 192.168.50.140 - - [29/七月/2019:09:56:22 +0800] "POST /admin/v2/namespaces/public/default/replication HTTP/1.1" 204 0 "-" "Pulsar-Java-v2.4.0" 12
4. 增加配置文件
(1)创建 mysql-jdbc-sink.yaml 配置文件。
(2)复制以下内容至 mysql-jdbc-sink.yaml 文件。
以下内容指定了 MySQL 的用户名、密码、链接和表名。
configs:
userName: "root"
password: "jdbc"
jdbcUrl: "jdbc:mysql://127.0.0.1:3306/test_jdbc"
tableName: "test_jdbc"
5. 创建 schema
数据库的表包含 schema 信息,JDBC sink 也支持 schema。
因此,只要构建好 schema,即能直接从 topic 中读取消息,再通过 JDBC 将消息传送至数据库的表,该 schema 与数据库的表一一对应。
以下示例创建 avro-schema 文件。
{
"type": "AVRO",
"schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
"properties": {}
}
提示:
更多关于 AVRO 字段的信息,参阅 AVRO 官网(https://avro.apache.org/docs/1.8.2/) 。
6. 上传 schema,启动 sink
(1)上传 schema 至 test-jdbc topic。
$ bin/pulsar-admin schemas upload test-jdbc -f avro-schema
(2)验证是否上传成功。
$ bin/pulsar-admin schemas get test-jdbc
如果出现以下信息,则说明上传成功。
{
"name": "test-jdbc",
"schema": {
"type": "record",
"name": "Test",
"fields": [
{
"name": "id",
"type": [
"null",
"int"
]
},
{
"name": "name",
"type": [
"null",
"string"
]
}
]
},
"type": "AVRO",
"properties": {}
}
(3)启动 sink。
$ bin/pulsar-admin sink localrun \
--archive connectors/pulsar-io-jdbc-2.4.0.nar \
--inputs test-jdbc \
--name mysql-jdbc-sink \
--sink-config-file connectors/mysql-jdbc-sink.yaml \
--parallelism 1
(4)验证是否启动成功。
如果出现以下信息,则说明启动成功。
10:01:20.357 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x21a4b716, L:/127.0.0.1:61690 - R:localhost/127.0.0.1:6650] Connected through proxy to target broker at tengdeMBP:6650
10:01:20.359 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [test-jdbc][public/default/mysql-jdbc-sink] Subscribing to topic on cnx [id: 0x21a4b716, L:/127.0.0.1:61690 - R:localhost/127.0.0.1:6650]
10:01:20.407 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [test-jdbc][public/default/mysql-jdbc-sink] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
实践
(1)创建 mysql-jdbc-sink,并指定 nar 文件、topic、名称、yaml 配置文件和并发数。
$ bin/pulsar-admin sink create \
--archive connectors/pulsar-io-jdbc-2.4.0.nar \
--inputs test-jdbc \
--name mysql-jdbc-sink \
--sink-config-file connectors/mysql-jdbc-sink.yaml \
--parallelism 1
(2)如果出现以下信息,则说明创建成功。
Created successfully
(1)显示所有 sink。
$ bin/pulsar-admin sink list \
--tenant public \
--namespace default
(2)返回结果显示前文创建的 mysql-jdbc-sink。
[
"mysql-jdbc-sink"
]
(1)显示 sink 的信息。
$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(2)返回结果显示 mysql-jdbc-sink 的信息,包括租户、命名空间和名称等。
{
"tenant": "public",
"namespace": "default",
"name": "mysql-jdbc-sink",
"className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink",
"inputSpecs": {
"test-jdbc": {
"isRegexPattern": false
}
},
"configs": {
"password": "jdbc",
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_jdbc",
"userName": "root",
"tableName": "test_jdbc"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true
}
(1)显示 mysql-jdbc-sink 的状态。
$ bin/pulsar-admin sink status \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(2)返回结果显示 mysql-jdbc-sink 的状态信息,包括实例数量、是否正在运行和 worker ID 等。
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReadFromPulsar" : 0,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSinkExceptions" : 0,
"latestSinkExceptions" : [ ],
"numWrittenToSink" : 0,
"lastReceivedTime" : 0,
"workerId" : "c-standalone-fw-tengdeMBP.lan-8080"
}
} ]
}
(1)停止 mysql-jdbc-sink。
$ bin/pulsar-admin sink stop \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0
(2)如果出现以下信息,则说明停止成功。
Stopped successfully
(1)启动 mysql-jdbc-sink。
$ bin/pulsar-admin sink start \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0
(2)如果出现以下信息,则说明启动成功。
Started successfully
(1)重启 mysql-jdbc-sink。
$ bin/pulsar-admin sink restart \
--tenant public \
--namespace default \
--name mysql-jdbc-sink \
--instance-id 0
(2)如果出现以下信息,则说明重启成功。
Restarted successfully
(1)将 parallelism 更新至 2。
$ bin/pulsar-admin sink update \
--name mysql-jdbc-sink \
--parallelism 2
(2)如果出现以下信息,则说明更新成功。
Updated successfully
(3)查看 mysql-jdbc-sink 的信息,再次验证更新结果。
$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(4)Parallelism 为 2,说明已更新成功。
{
"tenant": "public",
"namespace": "default",
"name": "mysql-jdbc-sink",
"className": "org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink",
"inputSpecs": {
"test-jdbc": {
"isRegexPattern": false
}
},
"configs": {
"password": "jdbc",
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test_jdbc",
"userName": "root",
"tableName": "test_jdbc"
},
"parallelism": 2,
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"autoAck": true
}
(1)删除 mysql-jdbc-sink。
$ bin/pulsar-admin sink delete \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(2)如果出现以下信息,则说明删除成功。
Deleted successfully
(3)查看 mysql-jdbc-sink 的信息,再次验证删除结果。
$ bin/pulsar-admin sink get \
--tenant public \
--namespace default \
--name mysql-jdbc-sink
(4)mysql-jdbc-sink 不存在,说明已删除成功。
HTTP 404 Not Found
Reason: Sink mysql-jdbc-sink doesn't exist
更多关于 localrun 的使用示例,参阅上文 6. 上传 schema,启动 sink 的第 3 步。
总结
本文介绍了 sink 的功能,并演示了如何创建与使用 JDBC sink 与 MySQL 进行连接。
更多关于 Pulsar connector 的信息,参阅 Pulsar Connector 预览
更多关于 Pulsar source 的信息,参阅 Pulsar Source 入门篇
作者 | tuteng
审校 | Anonymitaet
编辑 | Irene
社区福利推荐
HBase 技术社区
Apache HBase技术社区,研究探讨HBase内核原理,源码剖析,周边生态以及实践应用,汇集众多Apache HBase PMC & Committer以及爱好使用者,提供一线Apache HBase企业实战以及Flink集成等资讯。
Apache Kylin
Apache Kylin 公众号,介绍 Kylin 的功能特性、应用案例、经验分享、社区资讯、活动等。开源大数据分布式 OLAP 引擎 Apache Kylin 于 2014 年开源,在 2015 年和 2016 年连续获得 InfoWorld 的 BOSSIE 奖:年度最佳开源大数据工具奖,发展至今在全球已经拥有超过 1000 家企业用户。作为首个被 Apache 软件基金会认证的由中国人主导的顶级开源项目,Kylin 为万亿数据提供亚秒级查询,并可以和现有的 Hadoop/Spark 及 BI 无缝集成。
Ververica
Apache Flink 社区公众号,由 Apache Flink Community China 运营管理,旨在联合国内的 Flink 大 V,向国内宣传和普及 Flink 相关的技术。公众号将持续输出 Flink 最新社区动态,入门教程、Meetup 资讯、应用案例以及源码解析等内容,希望联合社区同学一起推动国内大数据技术发展。
Apache Pulsar
Apache Pulsar 是下一代云原生流数据平台,助力企业快速分析实时数据,激活数据价值,实现 C 位出道。这里是 Pulsar 前沿技术的传播圣地,也是技术爱好者、开发者和终极用户时刻关注的技术平台。我们定时分享 Pulsar 优质内容,包括社区活动、技术文章、用户案例、行业动态和热点话题等,让你全面拥抱 Pulsar 的一手讯息。Apache Pulsar,助力千万企业和技术人开疆拓土、共同成长。
Apache Pulsar 是下一代云原生分布式流数据平台,它源于 Yahoo,2016 年 12 月开源,2018 年 9 月正式成为 Apache 顶级项目,逐渐从单一的消息系统演化成集消息、存储和函数式轻量化计算的流数据平台。在 Apache Pulsar 快速发展的过程中,社区的伙伴们也致力于硅谷以外的布道之旅,在中国社区开始了不平凡的历程。8 月 17 日,来自 Yahoo!Japan、腾讯、智联招聘、EMQ、Apache Fink 和 Apache Pulsar 社区的开源爱好者们将齐聚一堂,共同探讨 Pulsar 的用户案例、最佳实践和 Pulsar 特性等等,包括:
Apache Pulsar at Yahoo! Japan
智联招聘如何参与社区开发以及 Key_Shared 等近期贡献特性详解
Apache Pulsar 在腾讯计费场景下的实践
Apache Pulsar 在 EMQ 物联网平台产品 ActorCloud 上的应用
Pulsar 2.5.0 事务功能预览
Apache Pulsar 和大数据生态的集成与实践
监控流系统中的 Flink 状态管理
扫描下图二维码或点击【阅读原文】,报名参加 Pulsar Beijing Meetup